/*
 * Copyright 2009 Alberto Gimeno <gimenete at gmail.com>
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */
package siena.sdb;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

import siena.AbstractPersistenceManager;
import siena.ClassInfo;
import siena.Query;
import siena.SienaException;
import siena.Util;
import siena.sdb.ws.Item;
import siena.sdb.ws.SelectResponse;
import siena.sdb.ws.SimpleDB;

public class SdbPersistenceManager extends AbstractPersistenceManager {
	
	private static final String[] supportedOperators = { "<", ">", ">=", "<=", "=" };
	private static long ioffset = Math.abs(0l+Integer.MIN_VALUE);

	private SimpleDB ws;
	private String prefix;
	private List<String> domains;

	public void init(Properties p) {
		String awsAccessKeyId = p.getProperty("awsAccessKeyId");
		String awsSecretAccessKey = p.getProperty("awsSecretAccessKey");
		if(awsAccessKeyId == null || awsSecretAccessKey == null)
			throw new SienaException("Both awsAccessKeyId and awsSecretAccessKey properties must be set");
		prefix = p.getProperty("prefix");
		if(prefix == null) prefix = "";
		ws = new SimpleDB(awsAccessKeyId, awsSecretAccessKey);
	}

	public <T> Query<T> createQuery(Class<T> clazz) {
		return new SdbQuery<T>(clazz);
	}

	public void delete(Object obj) {
		ws.deleteAttributes(getDomainName(obj.getClass()), toItem(obj));
	}

	public void get(Object obj) {
		Item item = ws.getAttributes(getDomainName(obj.getClass()), getIdValue(obj)).item;
		fillModel(item, obj);
	}

	public void insert(Object obj) {
		ws.putAttributes(getDomainName(obj.getClass()), toItem(obj));
	}

	public void update(Object obj) {
		ws.putAttributes(getDomainName(obj.getClass()), toItem(obj));
	}

	public String getIdValue(Object obj) {
		try {
			return (String) ClassInfo.getIdField(obj.getClass()).get(obj);
		} catch (Exception e) {
			throw new SienaException(e);
		}
	}

	protected String getDomainName(Class<?> clazz) {
		String domain = prefix+ClassInfo.getClassInfo(clazz).tableName;
		if(domains == null) {
			domains = ws.listDomains(null, null).domains; // TODO pagination
		}
		if(!domains.contains(domain)) {
			ws.createDomain(domain);
		}
		return domain;
	}

	private String getAttributeName(Field field) {
		return ClassInfo.getColumnNames(field)[0];
	}

	private Object readField(Object object, Field field) {
		field.setAccessible(true);
		try {
			return field.get(object);
		} catch (Exception e) {
			throw new SienaException(e);
		}
	}
	
	private Item toItem(Object obj) {
		Item item = new Item();
		Class<?> clazz = obj.getClass();
		for (Field field : ClassInfo.getClassInfo(clazz).updateFields) {
			try {
				String value = toString(field, field.get(obj));
//				while(value.length() > 1024) { // TODO: 1024 bytes != 1024 chars
//					String a = value.substring(0, 1024);
//					value = value.substring(1024);
//					item.add(getAttributeName(field), a);
//				}
				item.add(getAttributeName(field), value);
			} catch (Exception e) {
				throw new SienaException(e);
			}
		}
		Field id = ClassInfo.getIdField(clazz);
		String name = (String) readField(obj, id);
		if(name == null) { // TODO: only if auto-generated
			try {
				name = UUID.randomUUID().toString();
				id.set(obj, name);
			} catch (Exception e) {
				throw new SienaException(e);
			}
		}
		item.name = name;
		return item;
	}
	
	private static String toString(Field field, Object object) {
		if(field.getType() == Integer.class || field.getType() == int.class) {
			return toString((Integer) object);
		}
		return Util.toString(field, object);
	}

	private void fillModel(Item item, Object obj) {
		Class<?> clazz = obj.getClass();
		for (Field field : ClassInfo.getClassInfo(clazz).updateFields) {
			List<String> values = item.attributes.get(getAttributeName(field));
			if(values == null || values.isEmpty())
				continue;
			try {
				String value = values.get(0); //Util.join(values, "");
				if(field.getType() == Integer.class || field.getType() == int.class) {
					field.set(obj, fromString(value));
				} else {
					Util.setFromString(obj, field, value);
				}
			} catch (Exception e) {
				throw new SienaException(e);
			}
		}
		
		Field id = ClassInfo.getIdField(clazz);
		// item.name = (String) readField(obj, id);
		try {
			id.set(obj, item.name);
		} catch (Exception e) {
			throw new SienaException(e);
		} // TODO not only String ids
	}
	
	private static String toString(int i) {
		return String.format("%010d", i+ioffset);
	}
	
	private static int fromString(String s) {
		long l = Long.parseLong(s);
		return (int) (l-ioffset);
	}
	
	/* transactions */

	public void beginTransaction(int isolationLevel) {
	}

	public void closeConnection() {
	}

	public void commitTransaction() {
	}

	public void rollbackTransaction() {
	}
	
	class SdbQuery<T> implements Query<T> {
		
		private Class<?> clazz;
		private List<String> filter;
		private String order;
		
		private String nextOffset;
		
		public SdbQuery(Class<?> clazz) {
			this.clazz = clazz;
		}
		
		@SuppressWarnings("unchecked")
		private List<T> query(String suffix, String nextToken) {
			String domain = getDomainName(clazz);
			String q = buildQuery("select * from "+domain, suffix);
			SelectResponse response = ws.select(q, nextToken);
			nextOffset = response.nextToken;
			List<Item> items = response.items;
			List<T> result = new ArrayList<T>(items.size());
			for (Item item : items) {
				try {
					T object = (T) clazz.newInstance();
					fillModel(item, object);
					result.add(object);
				} catch (Exception e) {
					throw new SienaException(e);
				}
			}
			return result;
		}
		
		private String buildQuery(String prefix, String suffix) {
			StringBuilder query = new StringBuilder(prefix);
			if(filter != null) {
				query.append(" where ");
				query.append(Util.join(filter, " and "));
			}
			if(order != null) {
				query.append(" order by "+order);
			}
			query.append(suffix);
			return query.toString();
		}

		@Override
		public int count() {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public int count(int limit) {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public int count(int limit, Object offset) {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public List<T> fetch() {
			return query("", null);
		}

		@Override
		public List<T> fetch(int limit) {
			return query(" limit "+limit, null);
		}

		@Override
		public List<T> fetch(int limit, Object offset) {
			return query(" limit "+limit, offset.toString());
		}

		@Override
		public Query<T> filter(String field, Object value) {
			if(filter == null) filter = new ArrayList<String>();
			String op = "=";
			for (String operator : supportedOperators) {
				if(field.endsWith(operator)) {
					op = operator;
					field = field.substring(0, field.length()-operator.length());
					break;
				}
			}
			field = field.trim();
			
			try {
				Field f = clazz.getDeclaredField(field);
				String column = null;
				if(ClassInfo.isId(f)) {
					column = "itemName()";
				} else {
					column = ClassInfo.getColumnNames(f)[0];
				}
				String s = SdbPersistenceManager.toString(f, value);
				filter.add(column+op+SimpleDB.quote(s));
			} catch (Exception e) {
				throw new SienaException(e);
			}
			return this;
		}

		@Override
		public T get() {
			List<T> result = fetch(1);
			if(result.isEmpty()) return null;
			return result.get(0);
		}

		@Override
		public Iterable<T> iter(String field, int max) {
			// TODO Auto-generated method stub
			return null;
		}

		@Override
		public Query<T> order(String field) {
			try {
				boolean desc = false;
				if(field.startsWith("-")) {
					field = field.substring(1);
					desc = true;
				}
				String column = ClassInfo.getColumnNames(clazz.getDeclaredField(field))[0];
				order = column;
				if(desc)
					order += " desc";
			} catch (Exception e) {
				throw new SienaException(e);
			}
			return this;
		}

		@Override
		public Query<T> search(String match, boolean inBooleanMode,
				String... fieldNames) {
			// TODO Auto-generated method stub
			return null;
		}
		
		@Override
		public SdbQuery<T> clone() {
			return null;
		}
		
		@Override
		public Object nextOffset() {
			return nextOffset;
		}

	}

}
